1   /*
2    * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
3    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4    *
5    * This code is free software; you can redistribute it and/or modify it
6    * under the terms of the GNU General Public License version 2 only, as
7    * published by the Free Software Foundation.  Oracle designates this
8    * particular file as subject to the "Classpath" exception as provided
9    * by Oracle in the LICENSE file that accompanied this code.
10   *
11   * This code is distributed in the hope that it will be useful, but WITHOUT
12   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13   * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14   * version 2 for more details (a copy is included in the LICENSE file that
15   * accompanied this code).
16   *
17   * You should have received a copy of the GNU General Public License version
18   * 2 along with this work; if not, write to the Free Software Foundation,
19   * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20   *
21   * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22   * or visit www.oracle.com if you need additional information or have any
23   * questions.
24   */
25  package sun.nio.ch;
26  
27  import java.net.InetAddress;
28  import java.net.SocketAddress;
29  import java.net.SocketException;
30  import java.net.InetSocketAddress;
31  import java.io.FileDescriptor;
32  import java.io.IOException;
33  import java.util.Collections;
34  import java.util.Set;
35  import java.util.HashSet;
36  import java.nio.ByteBuffer;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.ClosedChannelException;
39  import java.nio.channels.ConnectionPendingException;
40  import java.nio.channels.NoConnectionPendingException;
41  import java.nio.channels.AlreadyConnectedException;
42  import java.nio.channels.NotYetBoundException;
43  import java.nio.channels.NotYetConnectedException;
44  import java.nio.channels.spi.SelectorProvider;
45  import com.sun.nio.sctp.AbstractNotificationHandler;
46  import com.sun.nio.sctp.Association;
47  import com.sun.nio.sctp.AssociationChangeNotification;
48  import com.sun.nio.sctp.HandlerResult;
49  import com.sun.nio.sctp.IllegalReceiveException;
50  import com.sun.nio.sctp.InvalidStreamException;
51  import com.sun.nio.sctp.IllegalUnbindException;
52  import com.sun.nio.sctp.MessageInfo;
53  import com.sun.nio.sctp.NotificationHandler;
54  import com.sun.nio.sctp.SctpChannel;
55  import com.sun.nio.sctp.SctpSocketOption;
56  import sun.nio.ch.PollArrayWrapper;
57  import sun.nio.ch.SelChImpl;
58  import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
59  import static sun.nio.ch.SctpResultContainer.SEND_FAILED;
60  import static sun.nio.ch.SctpResultContainer.ASSOCIATION_CHANGED;
61  import static sun.nio.ch.SctpResultContainer.PEER_ADDRESS_CHANGED;
62  import static sun.nio.ch.SctpResultContainer.SHUTDOWN;
63  
64  /**
65   * An implementation of an SctpChannel
66   */
67  public class SctpChannelImpl extends SctpChannel
68      implements SelChImpl
69  {
70      private final FileDescriptor fd;
71  
72      private final int fdVal;
73  
74      /* IDs of native threads doing send and receivess, for signalling */
75      private volatile long receiverThread = 0;
76      private volatile long senderThread = 0;
77  
78      /* Lock held by current receiving or connecting thread */
79      private final Object receiveLock = new Object();
80  
81      /* Lock held by current sending or connecting thread */
82      private final Object sendLock = new Object();
83  
84      private final ThreadLocal<Boolean> receiveInvoked =
85          new ThreadLocal<Boolean>() {
86               @Override protected Boolean initialValue() {
87                   return Boolean.FALSE;
88              }
89      };
90  
91      /* Lock held by any thread that modifies the state fields declared below
92         DO NOT invoke a blocking I/O operation while holding this lock! */
93      private final Object stateLock = new Object();
94  
95      private enum ChannelState {
96          UNINITIALIZED,
97          UNCONNECTED,
98          PENDING,
99          CONNECTED,
100         KILLPENDING,
101         KILLED,
102     }
103     /* -- The following fields are protected by stateLock -- */
104     private ChannelState state = ChannelState.UNINITIALIZED;
105 
106     /* Binding; Once bound the port will remain constant. */
107     int port = -1;
108     private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
109     /* Has the channel been bound to the wildcard address */
110     private boolean wildcard; /* false */
111     //private InetSocketAddress remoteAddress = null;
112 
113     /* Input/Output open */
114     private boolean readyToConnect;
115 
116     /* Shutdown */
117     private boolean isShutdown;
118 
119     private Association association;
120 
121     private Set<SocketAddress> remoteAddresses = Collections.EMPTY_SET;
122 
123     /* -- End of fields protected by stateLock -- */
124 
125     /**
126      * Constructor for normal connecting sockets
127      */
128     public SctpChannelImpl(SelectorProvider provider) throws IOException {
129         //TODO: update provider remove public modifier
130         super(provider);
131         this.fd = SctpNet.socket(true);
132         this.fdVal = IOUtil.fdVal(fd);
133         this.state = ChannelState.UNCONNECTED;
134     }
135 
136     /**
137      * Constructor for sockets obtained from server sockets
138      */
139     public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)
140          throws IOException {
141         this(provider, fd, null);
142     }
143 
144     /**
145      * Constructor for sockets obtained from branching
146      */
147     public SctpChannelImpl(SelectorProvider provider,
148                            FileDescriptor fd,
149                            Association association)
150             throws IOException {
151         super(provider);
152         this.fd = fd;
153         this.fdVal = IOUtil.fdVal(fd);
154         this.state = ChannelState.CONNECTED;
155         port = (Net.localAddress(fd)).getPort();
156 
157         if (association != null) { /* branched */
158             this.association = association;
159         } else { /* obtained from server channel */
160             /* Receive COMM_UP */
161             ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
162             try {
163                 receive(buf, null, null, true);
164             } finally {
165                 Util.releaseTemporaryDirectBuffer(buf);
166             }
167         }
168     }
169 
170     /**
171      * Binds the channel's socket to a local address.
172      */
173     @Override
174     public SctpChannel bind(SocketAddress local) throws IOException {
175         synchronized (receiveLock) {
176             synchronized (sendLock) {
177                 synchronized (stateLock) {
178                     ensureOpenAndUnconnected();
179                     if (isBound())
180                         SctpNet.throwAlreadyBoundException();
181                     InetSocketAddress isa = (local == null) ?
182                         new InetSocketAddress(0) : Net.checkAddress(local);
183                     Net.bind(fd, isa.getAddress(), isa.getPort());
184                     InetSocketAddress boundIsa = Net.localAddress(fd);
185                     port = boundIsa.getPort();
186                     localAddresses.add(isa);
187                     if (isa.getAddress().isAnyLocalAddress())
188                         wildcard = true;
189                 }
190             }
191         }
192         return this;
193     }
194 
195     @Override
196     public SctpChannel bindAddress(InetAddress address)
197             throws IOException {
198         bindUnbindAddress(address, true);
199         localAddresses.add(new InetSocketAddress(address, port));
200         return this;
201     }
202 
203     @Override
204     public SctpChannel unbindAddress(InetAddress address)
205             throws IOException {
206         bindUnbindAddress(address, false);
207         localAddresses.remove(new InetSocketAddress(address, port));
208         return this;
209     }
210 
211     private SctpChannel bindUnbindAddress(InetAddress address, boolean add)
212             throws IOException {
213         if (address == null)
214             throw new IllegalArgumentException();
215 
216         synchronized (receiveLock) {
217             synchronized (sendLock) {
218                 synchronized (stateLock) {
219                     if (!isOpen())
220                         throw new ClosedChannelException();
221                     if (!isBound())
222                         throw new NotYetBoundException();
223                     if (wildcard)
224                         throw new IllegalStateException(
225                                 "Cannot add or remove addresses from a channel that is bound to the wildcard address");
226                     if (address.isAnyLocalAddress())
227                         throw new IllegalArgumentException(
228                                 "Cannot add or remove the wildcard address");
229                     if (add) {
230                         for (InetSocketAddress addr : localAddresses) {
231                             if (addr.getAddress().equals(address)) {
232                                 SctpNet.throwAlreadyBoundException();
233                             }
234                         }
235                     } else { /*removing */
236                         /* Verify that there is more than one address
237                          * and that address is already bound */
238                         if (localAddresses.size() <= 1)
239                             throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
240                         boolean foundAddress = false;
241                         for (InetSocketAddress addr : localAddresses) {
242                             if (addr.getAddress().equals(address)) {
243                                 foundAddress = true;
244                                 break;
245                             }
246                         }
247                         if (!foundAddress )
248                             throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
249                     }
250 
251                     SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
252 
253                     /* Update our internal Set to reflect the addition/removal */
254                     if (add)
255                         localAddresses.add(new InetSocketAddress(address, port));
256                     else {
257                         for (InetSocketAddress addr : localAddresses) {
258                             if (addr.getAddress().equals(address)) {
259                                 localAddresses.remove(addr);
260                                 break;
261                             }
262                         }
263                     }
264                 }
265             }
266         }
267         return this;
268     }
269 
270     private boolean isBound() {
271         synchronized (stateLock) {
272             return port == -1 ? false : true;
273         }
274     }
275 
276     private boolean isConnected() {
277         synchronized (stateLock) {
278             return (state == ChannelState.CONNECTED);
279         }
280     }
281 
282     private void ensureOpenAndUnconnected() throws IOException {
283         synchronized (stateLock) {
284             if (!isOpen())
285                 throw new ClosedChannelException();
286             if (isConnected())
287                 throw new AlreadyConnectedException();
288             if (state == ChannelState.PENDING)
289                 throw new ConnectionPendingException();
290         }
291     }
292 
293     private boolean ensureReceiveOpen() throws ClosedChannelException {
294         synchronized (stateLock) {
295             if (!isOpen())
296                 throw new ClosedChannelException();
297             if (!isConnected())
298                 throw new NotYetConnectedException();
299             else
300                 return true;
301         }
302     }
303 
304     private void ensureSendOpen() throws ClosedChannelException {
305         synchronized (stateLock) {
306             if (!isOpen())
307                 throw new ClosedChannelException();
308             if (isShutdown)
309                 throw new ClosedChannelException();
310             if (!isConnected())
311                 throw new NotYetConnectedException();
312         }
313     }
314 
315     private void receiverCleanup() throws IOException {
316         synchronized (stateLock) {
317             receiverThread = 0;
318             if (state == ChannelState.KILLPENDING)
319                 kill();
320         }
321     }
322 
323     private void senderCleanup() throws IOException {
324         synchronized (stateLock) {
325             senderThread = 0;
326             if (state == ChannelState.KILLPENDING)
327                 kill();
328         }
329     }
330 
331     @Override
332     public Association association() throws ClosedChannelException {
333         synchronized (stateLock) {
334             if (!isOpen())
335                 throw new ClosedChannelException();
336             if (!isConnected())
337                 return null;
338 
339             return association;
340         }
341     }
342 
343     @Override
344     public boolean connect(SocketAddress endpoint) throws IOException {
345         synchronized (receiveLock) {
346             synchronized (sendLock) {
347                 ensureOpenAndUnconnected();
348                 InetSocketAddress isa = Net.checkAddress(endpoint);
349                 SecurityManager sm = System.getSecurityManager();
350                 if (sm != null)
351                     sm.checkConnect(isa.getAddress().getHostAddress(),
352                                     isa.getPort());
353                 synchronized (blockingLock()) {
354                     int n = 0;
355                     try {
356                         try {
357                             begin();
358                             synchronized (stateLock) {
359                                 if (!isOpen()) {
360                                     return false;
361                                 }
362                                 receiverThread = NativeThread.current();
363                             }
364                             for (;;) {
365                                 InetAddress ia = isa.getAddress();
366                                 if (ia.isAnyLocalAddress())
367                                     ia = InetAddress.getLocalHost();
368                                 n = SctpNet.connect(fdVal, ia, isa.getPort());
369                                 if (  (n == IOStatus.INTERRUPTED)
370                                       && isOpen())
371                                     continue;
372                                 break;
373                             }
374                         } finally {
375                             receiverCleanup();
376                             end((n > 0) || (n == IOStatus.UNAVAILABLE));
377                             assert IOStatus.check(n);
378                         }
379                     } catch (IOException x) {
380                         /* If an exception was thrown, close the channel after
381                          * invoking end() so as to avoid bogus
382                          * AsynchronousCloseExceptions */
383                         close();
384                         throw x;
385                     }
386 
387                     if (n > 0) {
388                         synchronized (stateLock) {
389                             /* Connection succeeded */
390                             state = ChannelState.CONNECTED;
391                             if (!isBound()) {
392                                 InetSocketAddress boundIsa =
393                                         Net.localAddress(fd);
394                                 port = boundIsa.getPort();
395                             }
396 
397                             /* Receive COMM_UP */
398                             ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
399                             try {
400                                 receive(buf, null, null, true);
401                             } finally {
402                                 Util.releaseTemporaryDirectBuffer(buf);
403                             }
404 
405                             /* cache remote addresses */
406                             try {
407                                 remoteAddresses = getRemoteAddresses();
408                             } catch (IOException unused) { /* swallow exception */ }
409 
410                             return true;
411                         }
412                     } else  {
413                         synchronized (stateLock) {
414                             /* If nonblocking and no exception then connection
415                              * pending; disallow another invocation */
416                             if (!isBlocking())
417                                 state = ChannelState.PENDING;
418                             else
419                                 assert false;
420                         }
421                     }
422                 }
423                 return false;
424             }
425         }
426     }
427 
428     @Override
429     public boolean connect(SocketAddress endpoint,
430                            int maxOutStreams,
431                            int maxInStreams)
432             throws IOException {
433         ensureOpenAndUnconnected();
434         return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.
435                 create(maxInStreams, maxOutStreams)).connect(endpoint);
436 
437     }
438 
439     @Override
440     public boolean isConnectionPending() {
441         synchronized (stateLock) {
442             return (state == ChannelState.PENDING);
443         }
444     }
445 
446     @Override
447     public boolean finishConnect() throws IOException {
448         synchronized (receiveLock) {
449             synchronized (sendLock) {
450                 synchronized (stateLock) {
451                     if (!isOpen())
452                         throw new ClosedChannelException();
453                     if (isConnected())
454                         return true;
455                     if (state != ChannelState.PENDING)
456                         throw new NoConnectionPendingException();
457                 }
458                 int n = 0;
459                 try {
460                     try {
461                         begin();
462                         synchronized (blockingLock()) {
463                             synchronized (stateLock) {
464                                 if (!isOpen()) {
465                                     return false;
466                                 }
467                                 receiverThread = NativeThread.current();
468                             }
469                             if (!isBlocking()) {
470                                 for (;;) {
471                                     n = checkConnect(fd, false, readyToConnect);
472                                     if (  (n == IOStatus.INTERRUPTED)
473                                           && isOpen())
474                                         continue;
475                                     break;
476                                 }
477                             } else {
478                                 for (;;) {
479                                     n = checkConnect(fd, true, readyToConnect);
480                                     if (n == 0) {
481                                         // Loop in case of
482                                         // spurious notifications
483                                         continue;
484                                     }
485                                     if (  (n == IOStatus.INTERRUPTED)
486                                           && isOpen())
487                                         continue;
488                                     break;
489                                 }
490                             }
491                         }
492                     } finally {
493                         synchronized (stateLock) {
494                             receiverThread = 0;
495                             if (state == ChannelState.KILLPENDING) {
496                                 kill();
497                                 /* poll()/getsockopt() does not report
498                                  * error (throws exception, with n = 0)
499                                  * on Linux platform after dup2 and
500                                  * signal-wakeup. Force n to 0 so the
501                                  * end() can throw appropriate exception */
502                                 n = 0;
503                             }
504                         }
505                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
506                         assert IOStatus.check(n);
507                     }
508                 } catch (IOException x) {
509                     /* If an exception was thrown, close the channel after
510                      * invoking end() so as to avoid bogus
511                      * AsynchronousCloseExceptions */
512                     close();
513                     throw x;
514                 }
515 
516                 if (n > 0) {
517                     synchronized (stateLock) {
518                         state = ChannelState.CONNECTED;
519                         if (!isBound()) {
520                             InetSocketAddress boundIsa =
521                                     Net.localAddress(fd);
522                             port = boundIsa.getPort();
523                         }
524 
525                         /* Receive COMM_UP */
526                         ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
527                         try {
528                             receive(buf, null, null, true);
529                         } finally {
530                             Util.releaseTemporaryDirectBuffer(buf);
531                         }
532 
533                         /* cache remote addresses */
534                         try {
535                             remoteAddresses = getRemoteAddresses();
536                         } catch (IOException unused) { /* swallow exception */ }
537 
538                         return true;
539                     }
540                 }
541             }
542         }
543         return false;
544     }
545 
546     @Override
547     protected void implConfigureBlocking(boolean block) throws IOException {
548         IOUtil.configureBlocking(fd, block);
549     }
550 
551     @Override
552     public void implCloseSelectableChannel() throws IOException {
553         synchronized (stateLock) {
554             SctpNet.preClose(fdVal);
555 
556             if (receiverThread != 0)
557                 NativeThread.signal(receiverThread);
558 
559             if (senderThread != 0)
560                 NativeThread.signal(senderThread);
561 
562             if (!isRegistered())
563                 kill();
564         }
565     }
566 
567     @Override
568     public FileDescriptor getFD() {
569         return fd;
570     }
571 
572     @Override
573     public int getFDVal() {
574         return fdVal;
575     }
576 
577     /**
578      * Translates native poll revent ops into a ready operation ops
579      */
580     private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
581         int intOps = sk.nioInterestOps();
582         int oldOps = sk.nioReadyOps();
583         int newOps = initialOps;
584 
585         if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
586             /* This should only happen if this channel is pre-closed while a
587              * selection operation is in progress
588              * ## Throw an error if this channel has not been pre-closed */
589             return false;
590         }
591 
592         if ((ops & (PollArrayWrapper.POLLERR
593                     | PollArrayWrapper.POLLHUP)) != 0) {
594             newOps = intOps;
595             sk.nioReadyOps(newOps);
596             /* No need to poll again in checkConnect,
597              * the error will be detected there */
598             readyToConnect = true;
599             return (newOps & ~oldOps) != 0;
600         }
601 
602         if (((ops & PollArrayWrapper.POLLIN) != 0) &&
603             ((intOps & SelectionKey.OP_READ) != 0) &&
604             isConnected())
605             newOps |= SelectionKey.OP_READ;
606 
607         if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
608             ((intOps & SelectionKey.OP_CONNECT) != 0) &&
609             ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) {
610             newOps |= SelectionKey.OP_CONNECT;
611             readyToConnect = true;
612         }
613 
614         if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
615             ((intOps & SelectionKey.OP_WRITE) != 0) &&
616             isConnected())
617             newOps |= SelectionKey.OP_WRITE;
618 
619         sk.nioReadyOps(newOps);
620         return (newOps & ~oldOps) != 0;
621     }
622 
623     @Override
624     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
625         return translateReadyOps(ops, sk.nioReadyOps(), sk);
626     }
627 
628     @Override
629     @SuppressWarnings("all")
630     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
631         return translateReadyOps(ops, 0, sk);
632     }
633 
634     @Override
635     public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
636         int newOps = 0;
637         if ((ops & SelectionKey.OP_READ) != 0)
638             newOps |= PollArrayWrapper.POLLIN;
639         if ((ops & SelectionKey.OP_WRITE) != 0)
640             newOps |= PollArrayWrapper.POLLOUT;
641         if ((ops & SelectionKey.OP_CONNECT) != 0)
642             newOps |= PollArrayWrapper.POLLCONN;
643         sk.selector.putEventOps(sk, newOps);
644     }
645 
646     @Override
647     public void kill() throws IOException {
648         synchronized (stateLock) {
649             if (state == ChannelState.KILLED)
650                 return;
651             if (state == ChannelState.UNINITIALIZED) {
652                 state = ChannelState.KILLED;
653                 return;
654             }
655             assert !isOpen() && !isRegistered();
656 
657             /* Postpone the kill if there is a waiting reader
658              * or writer thread. */
659             if (receiverThread == 0 && senderThread == 0) {
660                 SctpNet.close(fdVal);
661                 state = ChannelState.KILLED;
662             } else {
663                 state = ChannelState.KILLPENDING;
664             }
665         }
666     }
667 
668     @Override
669     public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
670             throws IOException {
671         if (name == null)
672             throw new NullPointerException();
673         if (!supportedOptions().contains(name))
674             throw new UnsupportedOperationException("'" + name + "' not supported");
675 
676         synchronized (stateLock) {
677             if (!isOpen())
678                 throw new ClosedChannelException();
679 
680             SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/);
681         }
682         return this;
683     }
684 
685     @Override
686     @SuppressWarnings("unchecked")
687     public <T> T getOption(SctpSocketOption<T> name) throws IOException {
688         if (name == null)
689             throw new NullPointerException();
690         if (!supportedOptions().contains(name))
691             throw new UnsupportedOperationException("'" + name + "' not supported");
692 
693         synchronized (stateLock) {
694             if (!isOpen())
695                 throw new ClosedChannelException();
696 
697             return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/);
698         }
699     }
700 
701     private static class DefaultOptionsHolder {
702         static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
703 
704         private static Set<SctpSocketOption<?>> defaultOptions() {
705             HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
706             set.add(SCTP_DISABLE_FRAGMENTS);
707             set.add(SCTP_EXPLICIT_COMPLETE);
708             set.add(SCTP_FRAGMENT_INTERLEAVE);
709             set.add(SCTP_INIT_MAXSTREAMS);
710             set.add(SCTP_NODELAY);
711             set.add(SCTP_PRIMARY_ADDR);
712             set.add(SCTP_SET_PEER_PRIMARY_ADDR);
713             set.add(SO_SNDBUF);
714             set.add(SO_RCVBUF);
715             set.add(SO_LINGER);
716             return Collections.unmodifiableSet(set);
717         }
718     }
719 
720     @Override
721     public final Set<SctpSocketOption<?>> supportedOptions() {
722         return DefaultOptionsHolder.defaultOptions;
723     }
724 
725     @Override
726     public <T> MessageInfo receive(ByteBuffer buffer,
727                                    T attachment,
728                                    NotificationHandler<T> handler)
729             throws IOException {
730         return receive(buffer, attachment, handler, false);
731     }
732 
733     private <T> MessageInfo receive(ByteBuffer buffer,
734                                     T attachment,
735                                     NotificationHandler<T> handler,
736                                     boolean fromConnect)
737             throws IOException {
738         if (buffer == null)
739             throw new IllegalArgumentException("buffer cannot be null");
740 
741         if (buffer.isReadOnly())
742             throw new IllegalArgumentException("Read-only buffer");
743 
744         if (receiveInvoked.get())
745             throw new IllegalReceiveException(
746                     "cannot invoke receive from handler");
747         receiveInvoked.set(Boolean.TRUE);
748 
749         try {
750             SctpResultContainer resultContainer = new SctpResultContainer();
751             do {
752                 resultContainer.clear();
753                 synchronized (receiveLock) {
754                     if (!ensureReceiveOpen())
755                         return null;
756 
757                     int n = 0;
758                     try {
759                         begin();
760 
761                         synchronized (stateLock) {
762                             if(!isOpen())
763                                 return null;
764                             receiverThread = NativeThread.current();
765                         }
766 
767                         do {
768                             n = receive(fdVal, buffer, resultContainer, fromConnect);
769                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
770                     } finally {
771                         receiverCleanup();
772                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
773                         assert IOStatus.check(n);
774                     }
775 
776                     if (!resultContainer.isNotification()) {
777                         /* message or nothing */
778                         if (resultContainer.hasSomething()) {
779                             /* Set the association before returning */
780                             SctpMessageInfoImpl info =
781                                     resultContainer.getMessageInfo();
782                             synchronized (stateLock) {
783                                 assert association != null;
784                                 info.setAssociation(association);
785                             }
786                             return info;
787                         } else
788                             /* Non-blocking may return null if nothing available*/
789                             return null;
790                     } else { /* notification */
791                         synchronized (stateLock) {
792                             handleNotificationInternal(
793                                     resultContainer);
794                         }
795                     }
796 
797                     if (fromConnect)  {
798                         /* If we reach here, then it was connect that invoked
799                          * receive and received the COMM_UP. We have already
800                          * handled the COMM_UP with the internal notification
801                          * handler. Simply return. */
802                         return null;
803                     }
804                 }  /* receiveLock */
805             } while (handler == null ? true :
806                 (invokeNotificationHandler(resultContainer, handler, attachment)
807                  == HandlerResult.CONTINUE));
808 
809             return null;
810         } finally {
811             receiveInvoked.set(Boolean.FALSE);
812         }
813     }
814 
815     private int receive(int fd,
816                         ByteBuffer dst,
817                         SctpResultContainer resultContainer,
818                         boolean peek)
819             throws IOException {
820         int pos = dst.position();
821         int lim = dst.limit();
822         assert (pos <= lim);
823         int rem = (pos <= lim ? lim - pos : 0);
824         if (dst instanceof DirectBuffer && rem > 0)
825             return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);
826 
827         /* Substitute a native buffer */
828         int newSize = Math.max(rem, 1);
829         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
830         try {
831             int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);
832             bb.flip();
833             if (n > 0 && rem > 0)
834                 dst.put(bb);
835             return n;
836         } finally {
837             Util.releaseTemporaryDirectBuffer(bb);
838         }
839     }
840 
841     private int receiveIntoNativeBuffer(int fd,
842                                         SctpResultContainer resultContainer,
843                                         ByteBuffer bb,
844                                         int rem,
845                                         int pos,
846                                         boolean peek)
847         throws IOException
848     {
849         int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek);
850 
851         if (n > 0)
852             bb.position(pos + n);
853         return n;
854     }
855 
856     private InternalNotificationHandler<?> internalNotificationHandler =
857             new InternalNotificationHandler();
858 
859     private void handleNotificationInternal(SctpResultContainer resultContainer)
860     {
861         invokeNotificationHandler(resultContainer,
862                 internalNotificationHandler, null);
863     }
864 
865     private class InternalNotificationHandler<T>
866             extends AbstractNotificationHandler<T>
867     {
868         @Override
869         public HandlerResult handleNotification(
870                 AssociationChangeNotification not, T unused) {
871             if (not.event().equals(
872                     AssociationChangeNotification.AssocChangeEvent.COMM_UP) &&
873                     association == null) {
874                 SctpAssocChange sac = (SctpAssocChange) not;
875                 association = new SctpAssociationImpl
876                        (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
877             }
878             return HandlerResult.CONTINUE;
879         }
880     }
881 
882     private <T> HandlerResult invokeNotificationHandler
883                                  (SctpResultContainer resultContainer,
884                                   NotificationHandler<T> handler,
885                                   T attachment) {
886         SctpNotification notification = resultContainer.notification();
887         synchronized (stateLock) {
888             notification.setAssociation(association);
889         }
890 
891         if (!(handler instanceof AbstractNotificationHandler)) {
892             return handler.handleNotification(notification, attachment);
893         }
894 
895         /* AbstractNotificationHandler */
896         AbstractNotificationHandler absHandler =
897                 (AbstractNotificationHandler)handler;
898         switch(resultContainer.type()) {
899             case ASSOCIATION_CHANGED :
900                 return absHandler.handleNotification(
901                         resultContainer.getAssociationChanged(), attachment);
902             case PEER_ADDRESS_CHANGED :
903                 return absHandler.handleNotification(
904                         resultContainer.getPeerAddressChanged(), attachment);
905             case SEND_FAILED :
906                 return absHandler.handleNotification(
907                         resultContainer.getSendFailed(), attachment);
908             case SHUTDOWN :
909                 return absHandler.handleNotification(
910                         resultContainer.getShutdown(), attachment);
911             default :
912                 /* implementation specific handlers */
913                 return absHandler.handleNotification(
914                         resultContainer.notification(), attachment);
915         }
916     }
917 
918     private void checkAssociation(Association sendAssociation) {
919         synchronized (stateLock) {
920             if (sendAssociation != null && !sendAssociation.equals(association)) {
921                 throw new IllegalArgumentException(
922                         "Cannot send to another association");
923             }
924         }
925     }
926 
927     private void checkStreamNumber(int streamNumber) {
928         synchronized (stateLock) {
929             if (association != null) {
930                 if (streamNumber < 0 ||
931                       streamNumber >= association.maxOutboundStreams())
932                     throw new InvalidStreamException();
933             }
934         }
935     }
936 
937     /* TODO: Add support for ttl and isComplete to both 121 12M
938      *       SCTP_EOR not yet supported on reference platforms
939      *       TTL support limited...
940      */
941     @Override
942     public int send(ByteBuffer buffer, MessageInfo messageInfo)
943             throws IOException {
944         if (buffer == null)
945             throw new IllegalArgumentException("buffer cannot be null");
946 
947         if (messageInfo == null)
948             throw new IllegalArgumentException("messageInfo cannot be null");
949 
950         checkAssociation(messageInfo.association());
951         checkStreamNumber(messageInfo.streamNumber());
952 
953         synchronized (sendLock) {
954             ensureSendOpen();
955 
956             int n = 0;
957             try {
958                 begin();
959 
960                 synchronized (stateLock) {
961                     if(!isOpen())
962                         return 0;
963                     senderThread = NativeThread.current();
964                 }
965 
966                 do {
967                     n = send(fdVal, buffer, messageInfo);
968                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
969 
970                 return IOStatus.normalize(n);
971             } finally {
972                 senderCleanup();
973                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
974                 assert IOStatus.check(n);
975             }
976         }
977     }
978 
979     private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
980             throws IOException {
981         int streamNumber = messageInfo.streamNumber();
982         SocketAddress target = messageInfo.address();
983         boolean unordered = messageInfo.isUnordered();
984         int ppid = messageInfo.payloadProtocolID();
985 
986         if (src instanceof DirectBuffer)
987             return sendFromNativeBuffer(fd, src, target, streamNumber,
988                     unordered, ppid);
989 
990         /* Substitute a native buffer */
991         int pos = src.position();
992         int lim = src.limit();
993         assert (pos <= lim && streamNumber >= 0);
994 
995         int rem = (pos <= lim ? lim - pos : 0);
996         ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
997         try {
998             bb.put(src);
999             bb.flip();
1000             /* Do not update src until we see how many bytes were written */
1001             src.position(pos);
1002 
1003             int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
1004                     unordered, ppid);
1005             if (n > 0) {
1006                 /* now update src */
1007                 src.position(pos + n);
1008             }
1009             return n;
1010         } finally {
1011             Util.releaseTemporaryDirectBuffer(bb);
1012         }
1013     }
1014 
1015     private int sendFromNativeBuffer(int fd,
1016                                      ByteBuffer bb,
1017                                      SocketAddress target,
1018                                      int streamNumber,
1019                                      boolean unordered,
1020                                      int ppid)
1021             throws IOException {
1022         int pos = bb.position();
1023         int lim = bb.limit();
1024         assert (pos <= lim);
1025         int rem = (pos <= lim ? lim - pos : 0);
1026 
1027         int written = send0(fd, ((DirectBuffer)bb).address() + pos,
1028                             rem, target, -1 /*121*/, streamNumber, unordered, ppid);
1029         if (written > 0)
1030             bb.position(pos + written);
1031         return written;
1032     }
1033 
1034     @Override
1035     public SctpChannel shutdown() throws IOException {
1036         synchronized(stateLock) {
1037             if (isShutdown)
1038                 return this;
1039 
1040             ensureSendOpen();
1041             SctpNet.shutdown(fdVal, -1);
1042             if (senderThread != 0)
1043                 NativeThread.signal(senderThread);
1044             isShutdown = true;
1045         }
1046         return this;
1047     }
1048 
1049     @Override
1050     public Set<SocketAddress> getAllLocalAddresses()
1051             throws IOException {
1052         synchronized (stateLock) {
1053             if (!isOpen())
1054                 throw new ClosedChannelException();
1055             if (!isBound())
1056                 return Collections.EMPTY_SET;
1057 
1058             return SctpNet.getLocalAddresses(fdVal);
1059         }
1060     }
1061 
1062     @Override
1063     public Set<SocketAddress> getRemoteAddresses()
1064             throws IOException {
1065         synchronized (stateLock) {
1066             if (!isOpen())
1067                 throw new ClosedChannelException();
1068             if (!isConnected() || isShutdown)
1069                 return Collections.EMPTY_SET;
1070 
1071             try {
1072                 return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);
1073             } catch (SocketException unused) {
1074                 /* an open connected channel should always have remote addresses */
1075                 return remoteAddresses;
1076             }
1077         }
1078     }
1079 
1080     /* Native */
1081     private static native void initIDs();
1082 
1083     static native int receive0(int fd, SctpResultContainer resultContainer,
1084             long address, int length, boolean peek) throws IOException;
1085 
1086     static native int send0(int fd, long address, int length,
1087             SocketAddress target, int assocId, int streamNumber,
1088             boolean unordered, int ppid) throws IOException;
1089 
1090     private static native int checkConnect(FileDescriptor fd, boolean block,
1091             boolean ready) throws IOException;
1092 
1093     static {
1094         Util.load();   /* loads nio & net native libraries */
1095         java.security.AccessController.doPrivileged(
1096                 new sun.security.action.LoadLibraryAction("sctp"));
1097         initIDs();
1098     }
1099 }